package com.gome.ocean.server.oracle;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import javax.sql.DataSource;

import org.apache.commons.lang.StringUtils;

import com.gome.ocean.common.enums.DataBaseType;
import com.gome.ocean.common.enums.RunMode;
import com.gome.ocean.common.utils.JdbcUtils;
import com.gome.ocean.dao.model.cango.CangoInstances;
import com.gome.ocean.server.oracle.applier.RecordApplier;
import com.gome.ocean.server.oracle.common.db.DataSourceFactory;
import com.gome.ocean.server.oracle.common.db.meta.ColumnMeta;
import com.gome.ocean.server.oracle.common.db.meta.Table;
import com.gome.ocean.server.oracle.common.db.meta.TableMetaGenerator;
import com.gome.ocean.server.oracle.common.db.rocketmq.RocketMqConfig;
import com.gome.ocean.server.oracle.common.db.rocketmq.RocketMqDataSource;
import com.gome.ocean.server.oracle.common.db.rocketmq.RocketMqSourceFactory;
import com.gome.ocean.server.oracle.common.lifecycle.AbstractYuGongLifeCycle;
import com.gome.ocean.server.oracle.common.model.DataSourceConfig;
import com.gome.ocean.server.oracle.common.model.YuGongContext;
import com.gome.ocean.server.oracle.common.stat.StatAggregation;
import com.gome.ocean.server.oracle.common.utils.YuGongUtils;
import com.gome.ocean.server.oracle.controller.YuGongInstance;
import com.gome.ocean.server.oracle.exception.YuGongException;
import com.gome.ocean.server.oracle.extractor.RecordExtractor;
import com.gome.ocean.server.oracle.positioner.RecordPositioner;
import com.gome.ocean.server.oracle.translator.DataTranslator;
import com.google.common.collect.Lists;

/**
 * 集成yugong
 * 
 * @author haqiaolong
 */
public class OracleInstancebak extends AbstractYuGongLifeCycle {

    private String                      name;                                                         // 实例名称
    private RunMode                     runMode               = RunMode.ALL;
    private CangoInstances              cangoInstances;
    private static final int            CONCURRENT_SIZE       = 5;
    private YuGongContext               globalContext;
    private DataBaseType                sourceDbType          = DataBaseType.Oracle;
    private DataBaseType                targetDbType          = DataBaseType.MySql;
    private static final String         DEFAULT_SOURCE_ENCODE = "utf-8";
    private static final int            DEFAULT_SIZE_BUFFER   = 16384;
    private static final int            DEFAULT_INTERVAL      = 5;
    private static final int            DEFAULT_RETRY_TIMES   = 5;
    private Map<String, YuGongInstance> ygInstances           = new HashMap<String, YuGongInstance>();
    private DataSourceFactory           dataSourceFactory     = new DataSourceFactory();
    private RocketMqSourceFactory       rocketMqSourceFactory = new RocketMqSourceFactory();

    public OracleInstancebak(CangoInstances cangoInstances){
        this.cangoInstances = cangoInstances;
        this.name = cangoInstances.getName();
    }

    public void start() {
        super.start();
        this.sourceDbType = DataBaseType.getDataBaseType(cangoInstances.getSourceType());
        if (this.sourceDbType == null) {
            throw new YuGongException("yugong.database.source.type should not be empty");
        }
        this.targetDbType = DataBaseType.getDataBaseType(cangoInstances.getTargetType());
        if (this.targetDbType == null) {
            throw new YuGongException("yugong.database.target.type should not be empty");
        }
        if (!dataSourceFactory.isStart()) {
            dataSourceFactory.start();
        }
        if (DataBaseType.ROCKETMQ == targetDbType) {
            if (!rocketMqSourceFactory.isStart()) {
                rocketMqSourceFactory.start();
            }
        }

        this.globalContext = initGlobalContext();

        Collection<TableHolder> tableMetas = initTables();

        for (TableHolder tableHolder : tableMetas) {
            YuGongContext context = buildContext(globalContext, tableHolder.table, tableHolder.ignoreSchema);

            RecordPositioner positioner = choosePositioner(tableHolder);
            RecordExtractor extractor = chooseExtractor(tableHolder, context, runMode, positioner);
            RecordApplier applier = chooseApplier(tableHolder, context, runMode);
            // 可能在装载DRDS时,已经加载了一次translator处理
            DataTranslator translator = tableHolder.translator;
            if (translator == null) {
                translator = choseTranslator(tableHolder);
            }
            YuGongInstance instance = new YuGongInstance(context);
            StatAggregation statAggregation = new StatAggregation(DEFAULT_SIZE_BUFFER, DEFAULT_INTERVAL);
            instance.setExtractor(extractor);
            instance.setApplier(applier);
            instance.setTranslator(translator);
            instance.setPositioner(positioner);
            instance.setStatAggregation(statAggregation);
            instance.setRetryTimes(DEFAULT_RETRY_TIMES);
            instance.setTargetDbType(targetDbType);
            instance.setNoUpdateThresold(3);
            // 设置translator的并发数
            instance.setThreadSize(CONCURRENT_SIZE);
            ygInstances.put(tableHolder.table.getFullName(), instance);
        }
        for (Entry<String, YuGongInstance> instance : ygInstances.entrySet()) {
            instance.getValue().start();
            try {
                instance.getValue().waitForDone();
            } catch (Exception e) {
                processException(instance.getValue().getContext().getTableMeta(), e);
            }
            instance.getValue().stop();
        }

        logger.info("## prepare start tables[{}] with concurrent[{}]", ygInstances.size(), CONCURRENT_SIZE);

    }

    private void processException(Table table, Exception e) {
        abort("process table[" + table.getFullName() + "] has error!", e);
        System.exit(-1);
    }

    private RecordPositioner choosePositioner(TableHolder tableHolder) {
        try {
            return null;// TODO
        } catch (Exception e) {
            throw new YuGongException(e);
        }
    }

    private DataTranslator choseTranslator(TableHolder tableHolder) {
        try {
            return null; //TODO 
        } catch (Exception e) {
            throw new YuGongException(e);
        }
    }

    private RecordApplier chooseApplier(TableHolder tableHolder, YuGongContext context, RunMode runMode) {
        return null;
    }

    private RecordExtractor chooseExtractor(TableHolder tableHolder, YuGongContext context, RunMode runMode,
                                            RecordPositioner positioner) {
        return null;
    }

    private YuGongContext buildContext(YuGongContext globalContext, Table table, boolean ignoreSchema) {
        YuGongContext result = globalContext.cloneGlobalContext();
        result.setTableMeta(table);
        if (ignoreSchema) {// 自动识别table是否为无shcema定义
            result.setIgnoreSchema(ignoreSchema);
        }
        return result;
    }

    public void stop() {
        super.stop();
        for (Entry<String, YuGongInstance> instance : ygInstances.entrySet()) {
            if (instance.getValue().isStart()) {
                instance.getValue().stop();
            }
        }
        if (dataSourceFactory.isStart()) {
            dataSourceFactory.stop();
        }
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    private YuGongContext initGlobalContext() {
        YuGongContext context = new YuGongContext();
        logger.info("check source database connection ...");
        context.setSourceDs(initSourceDataSource());
        logger.info("check source database is ok");

        logger.info("check target database connection ...");

        if (DataBaseType.ROCKETMQ == targetDbType) {
            context.setRocketMqDataSource(initRocketMqDataSource());
            logger.info("check rocketMq target database is ok");
        } else {
            context.setTargetDs(initTargetDataSource());
            logger.info("check target database is ok");
        }
        context.setSourceEncoding(DEFAULT_SOURCE_ENCODE);
        context.setTargetEncoding(DEFAULT_SOURCE_ENCODE);
        context.setBatchApply(true);
        context.setOnceCrawNum(200);
        context.setTpsLimit(2000);
        context.setIgnoreSchema(false);
        context.setSkipApplierException(false);
        context.setRunMode(runMode);
        return context;
    }

    /**
     * 初始化 rocketMq
     * 
     * @param type
     * @return
     */
    private RocketMqDataSource initRocketMqDataSource() {
        String topicname = cangoInstances.getTopicname();
        String namesrvaddr = cangoInstances.getNamesrvaddr();
        boolean serialize = cangoInstances.getSerialize().equals(1) ? true : false;
        String encode = cangoInstances.getEncode();
        RocketMqConfig rocketMqConfig = new RocketMqConfig(topicname, namesrvaddr, serialize, encode);
        return rocketMqSourceFactory.getDataSource(rocketMqConfig);
    }

    /**
     * 初始化源库信息
     * 
     * @return
     */
    private DataSource initSourceDataSource() {
        String username = cangoInstances.getSourceUserName();
        String password = cangoInstances.getSourceUserName();
        DataBaseType dbType = DataBaseType.getDataBaseType(cangoInstances.getSourceType());
        String url = JdbcUtils.getOracleJdbcUrl(cangoInstances.getSourceHost(),
            String.valueOf(cangoInstances.getSourcePort()),
            cangoInstances.getSourceDBname());
        Properties properties = new Properties();
        properties.setProperty("maxActive", "200");
        if (dbType.isMysql()) {// mysql的编码直接交给驱动去做
            properties.setProperty("characterEncoding", DEFAULT_SOURCE_ENCODE);
        }
        DataSourceConfig dsConfig = new DataSourceConfig(url, username, password, dbType, properties);
        return dataSourceFactory.getDataSource(dsConfig);
    }

    /**
     * 初始化源库信息
     * 
     * @return
     */
    private DataSource initTargetDataSource() {
        String username = cangoInstances.getTargetUserName();
        String password = cangoInstances.getTargetUserName();
        DataBaseType dbType = DataBaseType.getDataBaseType(cangoInstances.getTargetType());
        String url = JdbcUtils.getMysqlJdbcUrl(cangoInstances.getTargetHost(),
            String.valueOf(cangoInstances.getTargetPort()),
            cangoInstances.getTargetDBname());
        Properties properties = new Properties();
        properties.setProperty("maxActive", "200");
        if (dbType.isMysql()) {// mysql的编码直接交给驱动去做
            properties.setProperty("characterEncoding", DEFAULT_SOURCE_ENCODE);
        }
        DataSourceConfig dsConfig = new DataSourceConfig(url, username, password, dbType, properties);
        return dataSourceFactory.getDataSource(dsConfig);
    }

    private static class TableHolder {

        public TableHolder(Table table){
            this.table = table;
        }

        Table          table;
        boolean        ignoreSchema = false;
        DataTranslator translator   = null;

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((table == null) ? 0 : table.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null) return false;
            if (getClass() != obj.getClass()) return false;
            TableHolder other = (TableHolder) obj;
            if (table == null) {
                if (other.table != null) return false;
            } else if (!table.equals(other.table)) return false;
            return true;
        }
    }

    private Collection<TableHolder> initTables() {
        logger.info("check source tables read privileges ...");
        Set<String> tableList = cangoInstances.getTableNames();
        boolean isEmpty = true;
        for (String table : tableList) {
            isEmpty &= StringUtils.isBlank(table);
        }

        List<TableHolder> tables = Lists.newArrayList();
        DataBaseType targetDbType = globalContext.getRocketMqDataSource() != null ? DataBaseType.ROCKETMQ : YuGongUtils
            .judgeDbType(globalContext.getTargetDs());
        if (!isEmpty) {
            for (String obj : tableList) {
                String whiteTable = getTable(obj);
                String[] strs = StringUtils.split(whiteTable, ".");
                List<Table> whiteTables = null;
                boolean ignoreSchema = false;
                if (strs.length == 1) {
                    whiteTables = TableMetaGenerator.getTableMetasWithoutColumn(globalContext.getSourceDs(),
                        null,
                        strs[0]);
                    ignoreSchema = true;
                } else if (strs.length == 2) {
                    whiteTables = TableMetaGenerator.getTableMetasWithoutColumn(globalContext.getSourceDs(),
                        strs[0],
                        strs[1]);
                } else {
                    throw new YuGongException("table[" + whiteTable + "] is not valid");
                }

                if (whiteTables.isEmpty()) {
                    throw new YuGongException("table[" + whiteTable + "] is not found");
                }

                for (Table table : whiteTables) {
                    // 根据实际表名处理一下
                    TableMetaGenerator.buildColumns(globalContext.getSourceDs(), table);
                    // 构建一下拆分条件
                    DataTranslator translator = buildExtKeys(table, (String) obj, targetDbType);
                    TableHolder holder = new TableHolder(table);
                    holder.ignoreSchema = ignoreSchema;
                    holder.translator = translator;
                    if (!tables.contains(holder)) {                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
                        tables.add(holder);
                    }
                }
            }
        } else {
            List<Table> metas = TableMetaGenerator.getTableMetasWithoutColumn(globalContext.getSourceDs(), null, null);
            for (Table table : metas) {
                TableMetaGenerator.buildColumns(globalContext.getSourceDs(), table);
                // 构建一下拆分条件
                DataTranslator translator = buildExtKeys(table, null, targetDbType);
                TableHolder holder = new TableHolder(table);
                holder.translator = translator;
                if (!tables.contains(holder)) {
                    tables.add(holder);
                }
            }
        }

        logger.info("check source tables is ok.");
        return tables;
    }

    private String getTable(String tableName) {
        String[] paramArray = tableName.split("#");
        if (paramArray.length >= 1 && !"".equals(paramArray[0])) {
            return paramArray[0];
        } else {
            return null;
        }
    }

    /**
     * 尝试构建拆分字段,如果tableStr指定了拆分字段则读取之,否则在目标库找对应的拆分字段
     */
    private DataTranslator buildExtKeys(Table table, String tableStr, DataBaseType targetDbType) {
        DataTranslator translator = null;
        String extKey = getExtKey(tableStr);
        if (extKey != null) {
            // 以逗号切割
            String[] keys = StringUtils.split(StringUtils.replace(extKey, "|", ","), ",");
            List<String> newExtKeys = new ArrayList<String>();
            for (String key : keys) {
                boolean found = false;
                for (ColumnMeta meta : table.getPrimaryKeys()) {
                    if (meta.getName().equalsIgnoreCase(key)) {
                        found = true;
                        break;
                    }
                }

                if (!found) {
                    // 只增加非主键的字段
                    newExtKeys.add(key);
                }
            }

            if (newExtKeys.size() > 0) {
                extKey = StringUtils.join(newExtKeys, ",");
                table.setExtKey(extKey);

                // 调整一下原始表结构信息,将extKeys当做主键处理
                // 主要为简化extKeys变更时,等同于主键进行处理
                List<ColumnMeta> primaryKeys = table.getPrimaryKeys();
                List<ColumnMeta> newColumns = Lists.newArrayList();
                for (ColumnMeta column : table.getColumns()) {
                    boolean exist = false;
                    for (String key : newExtKeys) {
                        if (column.getName().equalsIgnoreCase(key)) {
                            primaryKeys.add(column);
                            exist = true;
                            break;
                        }
                    }

                    if (!exist) {
                        newColumns.add(column);
                    }
                }

                table.setPrimaryKeys(primaryKeys);
                table.setColumns(newColumns);
            }
        }

        return translator;
    }

    /**
     * 从表白名单中得到shardKey
     * 
     * @param tableName 带有shardkey的表, 例子 yugong_example_oracle#pk|name
     * @return
     */
    private String getExtKey(String tableName) {
        if (StringUtils.isEmpty(tableName)) {
            return null;
        }

        String[] paramArray = tableName.split("#");
        if (paramArray.length == 1) {
            return null;
        } else if (paramArray.length == 2) {
            return StringUtils.trim(paramArray[1]);
        } else {
            // 其他情况
            return null;
        }
    }

}
